iT邦幫忙

2021 iThome 鐵人賽

DAY 9
0
自我挑戰組

30天初步了解自然語言處理-自學筆記系列 第 9

[Day9] 詞性標注(四)-利用python實作POS任務

  • 分享至 

  • xImage
  •  

一. 資料準備

這邊的code是參考coursera上課程的code,根據自己的需求改成中文的範例
此資料為conllu檔,所以先透過處理這個檔的package來整理

## 資料準備
!pip install conllu
from io import open
from conllu import parse_incr
import re
  • 整理成想要的格式,一個專門紀錄句字的list,一個專門記錄POS的tag
def arrange_dataset(file_name: str, start_word_token: str, start_pos_token: str)-> dict:
    data_file = open(file_name, "r", encoding="utf-8")
    # 處理 dataset
    sentences_list = []
    pos_list = []

    for tokenlist in parse_incr(data_file):
        temp_str = [start_word_token]
        temp_pos = [start_pos_token]

        for s in tokenlist:
            temp_str.append(s['form'])
            temp_pos.append(s['upos'])
            
        sentences_list.append(temp_str)
        pos_list.append(temp_pos)
    
    return {
        'sentences_list': sentences_list,
        'pos_list': pos_list,
    }
# 處理 train 與 test data
train_file_path = 'UD_Chinese-GSD-master/zh_gsd-ud-train.conllu'
test_file_path = 'UD_Chinese-GSD-master/zh_gsd-ud-test.conllu'
start_w_token = '--s--'
start_pos_token = '--n--'

train_data_dict = arrange_dataset(train_file_path, start_w_token, start_pos_token)
test_data_dict = arrange_dataset(test_file_path, start_w_token, start_pos_token)

train_sentence, train_pos = train_data_dict['sentences_list'], train_data_dict['pos_list']
test_sentence, test_pos = test_data_dict['sentences_list'], test_data_dict['pos_list']
  • 將每個字紀錄,存成一個字典
# vocab: 將詞存至字典
vocab = {}
cnt_word = 0

# 計算每個詞出現的次數
freq = defaultdict(int)

for sentence in train_sentence: 
    for word in sentence:
        if word not in vocab:
            vocab[word] = cnt_word
            cnt_word += 1
            
        freq[word] += 1
    
print("字典:")
cnt = 0
for k, v in vocab.items():
    print(f"{k}:{v}")
    cnt += 1
    if cnt > 20:
        break

# output: 
# 字典:
# --s--:0
# 看似:1
# 簡單:2
# ,:3
  • 處理unk的字的function
def assign_unk(word):
    punct = set(string.punctuation)
    if any(char in punct for char in word):
        return "--unk_punct--"
    return "--unk--"
  • 標記word與tag
def get_word_tag(word, pos_tag, vocab):
    if word not in vocab:
        word = assign_unk(word)
    return word, pos_tag
get_word_tag('tardigrade', 'NN', vocab)
# output: ('--unk--', 'NN')

二. 實作HMM,處理HMM需要的三個矩陣

在計算這三個矩陣之前需要先記算數量,分別有下列三個數量:
* 狀態轉移 (Transition): 前一個詞性與後一個詞性的次數,transition_counts
* 發射的數量 (Emission): 每個字在每個詞性的數量,emission_counts
* 各個tag的數量: tag_count

import pandas as pd
from collections import defaultdict
import math
import numpy as np

def create_three_HMM_matrix(sentences_list: list, pos_list: list) -> dict:
    
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)

    sentence_len = len(sentences_list)
    
    i = 0 
    
    for sentence_idx in range(sentence_len):
        # 由該句第一個詞
        prev_tag = pos_list[sentence_idx][0]
        tag_counts[prev_tag] += 1
        
        for word_tag_idx in range(1, len(sentences_list[sentence_idx])):
            i += 1

            if i % 5000 == 0:
                print(f"word count = {i}")
            
            word, tag = get_word_tag(sentences_list[sentence_idx][word_tag_idx], 
                                     pos_list[sentence_idx][word_tag_idx], 
                                     vocab)

            transition_counts[(prev_tag, tag)] += 1
            emission_counts[(tag, word)] += 1
            tag_counts[tag] += 1
            prev_tag = tag
        
    return emission_counts, transition_counts, tag_counts
emission_counts, transition_counts, tag_counts = create_three_HMM_matrix(train_sentence, train_pos)
  • 每個詞性的數量:
# get all the POS states
states = sorted(tag_counts.keys())
print(f"POS tags 數量: {len(states)}")
print(states)
# output: POS tags 數量: 16
# ['--n--', 'ADJ', 'ADP', 'ADV', 'AUX', 'CCONJ', 'DET', 'NOUN', 'NUM', 'PART', 'PRON', 'PROPN', 'PUNCT', 'SYM', 'VERB', 'X']
  • 印出每個 transition 與 emission 的矩陣
print("transition 矩陣: ")
for ex in list(transition_counts.items())[:3]:
    print(ex)
print()

print("emission 矩陣: ")
for ex in list(emission_counts.items())[200:203]:
    print (ex)
    
# transition 矩陣: 
# (('--n--', 'AUX'), 8)
# (('AUX', 'ADJ'), 217)
# (('ADJ', 'PUNCT'), 464)

# emission 矩陣: 
# (('PUNCT', ':'), 86)
# (('PUNCT', '「'), 332)
# (('VERB', '吃'), 9)
  • 接下來是將count矩陣換算成機率矩陣:
    • transition_counts裡面存的是前一個詞性與後一個詞性的次數,ex: (('--n--', 'AUX'), 8)表示 '--n--'後面為'AUX'的次數在語料庫中有8次
    • 將 (前一個詞與後一個詞的次數)/(前一個tag次數) 就會是前一個詞與後一個詞的機率了,也就是transition_matrix,其中tag次數就存在tag_counts
def get_transition_matrix(alpha, tag_counts, transition_counts):
    
    # 先將tag先拿出來    
    all_tags = sorted(tag_counts.keys())
    
    # 初始一個 transition_matrix的矩陣 因為是得到 詞性與詞性的機率 故矩陣大小一定為詞性的總數x詞性的總數
    transition_matrix = np.zeros((len(all_tags), len(all_tags)))
    
    trans_keys = set(transition_counts.keys())
    
    for i in range(len(all_tags)):
        for j in range(len(all_tags)):
            count = 0
            key = (all_tags[i], all_tags[j])
            # 將有對應得tag取出來             
            if key in transition_counts:
                count = transition_counts.get(key)
            # 取出前一個tag的數量
            count_prev_tag = tag_counts[all_tags[i]]
            
            # 用alpha來避免分母為0            
            transition_matrix[i,j] = (count + alpha)/(count_prev_tag + alpha*len(all_tags))
    
    return transition_matrix
alpha = 0.001
transition_matrix = get_transition_matrix(alpha, tag_counts, transition_counts)

print(f"transition_matrix [0][0]: {transition_matrix[0,0]:.9f}")
print(f"transition_matrix [3][1]: {transition_matrix[3,1]:.4f}")

print("transition matrix:")
transition_sub = pd.DataFrame(transition_matrix[0:4,0:4], index=states[0:4], columns = states[0:4])
print(pd.DataFrame(transition_matrix[0:4,0:4], index=states[0:4], columns = states[0:4]))

# transition_matrix [0][0]: 0.000000250
# transition_matrix [3][1]: 0.0709
# transition matrix:
#               --n--       ADJ       ADP       ADV
# --n--  2.501866e-07  0.017513  0.130598  0.060545
# ADJ    4.086610e-07  0.024929  0.007356  0.016756
# ADP    2.205550e-07  0.024702  0.017424  0.034407
# ADV    2.181969e-07  0.070914  0.088152  0.089243
  • 換算emission_matrix:
    • emission_counts裡面存的是每個詞性對應每個詞出現的次數,ex: (('PUNCT', ':'), 86)表示 ':'為'PUNCT'的次數在語料庫中有86次
    • 將 (每個詞性對應每個詞出現的次數)/(詞性的次數) 就會是每個詞性對應每個詞的機率了,也就是emission_matrix,其中tag次數就存在tag_counts
def get_emission_matrix(alpha, tag_counts, emission_counts, vocab):
    
    num_tags = len(tag_counts)
    
    all_tags = sorted(tag_counts.keys())
    
    num_words = len(vocab)
    
    # 初始 emission_matrix,因為是每個詞在每個詞性出現的機率,故矩陣大小為 詞性數x語料庫的字數
    emission_matrix = np.zeros((num_tags, num_words))


    for i in range(num_tags):
        for j in range(num_words):

            count = 0
            key = (all_tags[i], vocab[j])

            if key in emission_counts: 
                count = emission_counts.get(key)
                
            count_tag = tag_counts[all_tags[i]]
                
            emission_matrix[i,j] = (count+alpha)/(num_words*alpha + count_tag)

    return emission_matrix
# creating your emission probability matrix. this takes a few minutes to run. 
emission_matrix = get_emission_matrix(alpha, tag_counts, emission_counts, list(vocab))

print(f"emission_matrix[0][0]: {emission_matrix[0,0]:.9f}")
print(f"emission_matrix[3][1]: {emission_matrix[3,1]:.9f}")

# Try viewing emissions for a few words in a sample dataframe
cidx  = ['其實','決擇','出身', '10']

# Get the integer ID for each word
cols = [vocab[a] for a in cidx]

# Choose POS tags to show in a sample dataframe
rvals =['--n--', 'ADP', 'ADV', 'AUX']

# For each POS tag, get the row number from the 'states' list
rows = [states.index(a) for a in rvals]

# Get the emissions for the sample of words, and the sample of POS tags
emission_matrix_sub = pd.DataFrame(emission_matrix[np.ix_(rows,cols)], index=rvals, columns = cidx )
print(emission_matrix_sub)

# emission_matrix[0][0]: 0.000000249
# emission_matrix[3][1]: 0.000000217
#                  其實            決擇            出身            10
# --n--  2.490900e-07  2.490900e-07  2.490900e-07  2.490900e-07
# ADP    2.197023e-07  2.197023e-07  2.197023e-07  2.197023e-07
# ADV    1.956478e-03  2.173623e-07  2.173623e-07  2.173623e-07
# AUX    3.447547e-07  3.447547e-07  3.447547e-07  3.447547e-07

三. 實作維特比

  • 在得到emission_matrix 與 transition_matrix 後,我們要用維特比演算法來找尋一個句子詞性的最大路徑,維特比演算法可分為三個部分:

    • 初始: 先計算由'--s--' 算出哪個詞性接在後面的機率最大
    • 正向傳播: 就是前面一天的算出第二天晴天或雨天的機率為多少,這邊就是計算每個詞對應的機率是多少並記錄前一個詞性的index
    • 反向傳播: 找到最後一個詞的哪個詞性最大,找出前一個詞詞性的index,一直回到'--s--',就可以列出來了
  • 初始:

def initialize(states, tag_counts, transition_matrix, emission_matrix, corpus, vocab):

    num_tags = len(tag_counts)
    
    best_probs = np.zeros((num_tags, len(corpus)))
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)
    
    s_idx = states.index("--n--")
    
    for i in range(num_tags):
        
        if transition_matrix[s_idx, i] == 0:
            
            best_probs[i,0] = float('-inf')
        else:
            best_probs[i,0] = math.log(transition_matrix[s_idx][i]) + math.log(emission_matrix[i][vocab["--s--"]])

    return best_probs, best_paths

只有第0行有數值(先計算由'--s--' 算出每個接在'--s--'詞性的機率)

best_probs, best_paths = initialize(states, tag_counts, transition_matrix, emission_matrix, test_sentence[0], vocab)
# Test the function
print(f"best_probs[0,0]: {best_probs[0, 0]}")
print(f"best_paths[2,3]: {best_paths[2, 3]}")

# output:
# best_probs[0,0]: -30.40651015286206
# best_paths[2,3]: 0
  • 正向傳播:
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
   
    num_tags = best_probs.shape[0]

    for i in range(1, len(test_corpus)): 
        for j in range(num_tags):
            
            best_prob_i = float('-inf')
            
            best_path_i = None

            for k in range(num_tags):
            
                prob = best_probs[k][i-1] + math.log(A[k][j]) + math.log(B[j][vocab[test_corpus[i]]]) 

                if best_prob_i < prob:

                    best_prob_i = prob
                    best_path_i = k

            best_probs[j,i] = best_prob_i
            best_paths[j,i] = best_path_i

    return best_probs, best_paths
# 更新 best_probs, best_paths
best_probs, best_paths = viterbi_forward(transition_matrix, emission_matrix, test_sentence[0], best_probs, best_paths, vocab)
  • 反向傳播:
def viterbi_backward(best_probs, best_paths, corpus, states):

    m = best_paths.shape[1] 

    z = [None] * m

    num_tags = best_probs.shape[0]

    best_prob_for_last_word = float('-inf')

    pred = [None] * m
    
    for k in range(num_tags):

        if best_probs[k][m-1] > best_prob_for_last_word:

            best_prob_for_last_word = best_probs[k][m-1]

            z[m - 1] = k
            
    pred[m - 1] = states[z[m - 1]]
    
    for i in range(m - 1, -1, -1):
        
        pos_tag_for_word_i = z[i]
        
        z[i - 1] = best_paths[pos_tag_for_word_i][i]
        
        pred[i - 1] = states[z[i-1]]
        
    return pred
  • 來預測詞的詞性~:
pred = viterbi_backward(best_probs, best_paths, test_sentence[0], states)
m=len(pred)
print('The prediction for pred[0:8] is: \n', pred[0:7], "\n", test_sentence[0][0:7])
The prediction for pred[0:8] is: 
 ['PRON', 'ADV', 'PUNCT', 'PRON', 'PART', 'NOUN', 'ADV'] 
 ['--s--', '然而', ',', '這樣', '的', '處理', '也']

今天這篇比較長,如果對code不熟可以一行一行慢慢trace,尤其是維特比真的花好久才知道怎麼做,連結[1]附上該堂課的程式碼code,歡迎大家看,明天會開始說明詞/句子等相關詞向量的表示方法~~

參考資訊
[1] 範例程式


上一篇
[Day8] 詞性標注(三)-Viterbi 演算法
下一篇
[Day10] 文本/詞表示方式(一)-前言
系列文
30天初步了解自然語言處理-自學筆記30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言